[SPARK-48484][SQL] Fix: V2Write use the same TaskAttemptId for different task attempts#46811
Conversation
|
also cc @yaooqinn @HyukjinKwon @cloud-fan @pan3793 |
yaooqinn
left a comment
There was a problem hiding this comment.
+1, LGTM.
Can we have a test case?
|
Please remove the [MINOR] tag and file a Jira ticket for this |
|
|
||
| override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { | ||
| val taskAttemptContext = createTaskAttemptContext(partitionId) | ||
| val taskAttemptContext = createTaskAttemptContext(partitionId, realTaskId.toInt & Int.MaxValue) |
There was a problem hiding this comment.
is it the same as math.abs?
There was a problem hiding this comment.
Yep, it same as Math.abs(realTaskId.toInt)
There was a problem hiding this comment.
There are at least two other similar cases here, should we unify them as math.abs? Of course, this should be another PR. @cloud-fan
There was a problem hiding this comment.
I don't think perf matters here, and math.abs is definitely more readable.
There was a problem hiding this comment.
ok, we can unify the above three cases to math.abs in a follow-up.
There was a problem hiding this comment.
@cloud-fan If overflow, realTaskId.toInt & Int.MaxValue and math.abs are not equal:
scala> val realTaskId = Long.MaxValue
val realTaskId: Long = 9223372036854775807
scala> val a = realTaskId.toInt
val a: Int = -1
scala> val b = realTaskId.toInt & Int.MaxValue
val b: Int = 2147483647
scala> val c= math.abs(realTaskId.toInt)
val c: Int = 1scala> val realTaskId = Int.MaxValue.toLong + 1
val realTaskId: Long = 2147483648
scala> val a = realTaskId.toInt
val a: Int = -2147483648
scala> val b = realTaskId.toInt & Int.MaxValue
val b: Int = 0
scala> val c= math.abs(realTaskId.toInt)
val c: Int = -2147483648Meanwhile, when an overflow occurs, math.abs may still return a negative value, so I suggest we continue using & Int.MaxValue
Sure, I will add a follow up for SPARK-42478 and a suite test for this pr. |
|
No,SPARK-42478 is not part of the Spark 4.0 cycle, please use a new jira ticket. @jackylee-ch |
|
create SPARK-48484 for this one @jackylee-ch |
| @@ -38,7 +38,7 @@ case class FileWriterFactory ( | |||
| @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0) | |||
|
|
|||
| override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { | |||
There was a problem hiding this comment.
Not related to this pr, why not naming it as taskAttemptId ? does realTaskId can be something else ?
There was a problem hiding this comment.
can we use PrivateMethodTester in FileWriterFactorySuite to avoid expanding the scope of this function
1988368 to
74782eb
Compare
There was a problem hiding this comment.
if we just check createTaskAttemptContext, do we really need to inherit from SharedSparkSession? Can we just inherit from SparkFunSuite?
There was a problem hiding this comment.
We need a Configuration here as it will be used in createTaskAttemptContext. It's ok to me that we just create a new Configuration.
74782eb to
3bb15e5
Compare
...re/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactorySuite.scala
Outdated
Show resolved
Hide resolved
…rces/v2/FileWriterFactorySuite.scala
…ent task attempts ### What changes were proposed in this pull request? After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46811 from jackylee-ch/fix_v2write_use_same_directories_for_different_task_attempts. Lead-authored-by: jackylee-ch <lijunqing@baidu.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 67d11b1) Signed-off-by: yangjie01 <yangjie01@baidu.com>
…ent task attempts ### What changes were proposed in this pull request? After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46811 from jackylee-ch/fix_v2write_use_same_directories_for_different_task_attempts. Lead-authored-by: jackylee-ch <lijunqing@baidu.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 67d11b1) Signed-off-by: yangjie01 <yangjie01@baidu.com>
|
Merged into master/3.5/3.4. thanks @jackylee-ch @yaooqinn @cloud-fan @ulysses-you @yikf |
What changes were proposed in this pull request?
After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
GA
Was this patch authored or co-authored using generative AI tooling?
No.